package com.lianda.connectors.sink;

import com.lianda.connectors.utils.ExecutionEnvUtil;
import com.lianda.connectors.utils.GsonUtil;
import com.lianda.connectors.utils.KafkaConfigUtil;
import com.lianda.model.Student;
import com.lianda.utils.ESSinkUtil;
import com.lianda.utils.RestClientFactoryImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.http.HttpHost;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.xcontent.XContentType;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import static com.lianda.constant.PropertiesConstants.*;


/**
 * 数据写入ES
 * 参考：https://www.jianshu.com/p/3957d35a0739
 *
 * 查看索引是否写入：
 * http://localhost:9200/_cat/indices?v
 *
 * 查看数据是否写入相关索引：
 * http://localhost:9200/student-index/_search?pretty
 */
@Slf4j
public class Sink2ES6Main {

    private static final String READ_TOPIC = "student";

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        ParameterTool parameterTool = ExecutionEnvUtil.PARAMETER_TOOL;

        Properties props = KafkaConfigUtil.buildKafkaProps(parameterTool);

        //从Kafka读取Student数据
        DataStreamSource<String> student = env.addSource(new FlinkKafkaConsumer011<String>(
                READ_TOPIC,
                new SimpleStringSchema(),
                props)).setParallelism(1);

        //打印收到的数据
        student.print();
        log.info("student:" + student);
        List<HttpHost> esAddresses = new ArrayList<>();
        esAddresses.add(new HttpHost("127.0.0.1", 9200, "http"));
        ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
                esAddresses,
                new ElasticsearchSinkFunction<String>() {
                    @Override
                    public void process(String s, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
                        log.info("data===>{}", s);
                        requestIndexer.add(Requests.indexRequest()
                        .index("student-index")
                        .type("student")
                         //XContentType.JSON 需要加上
                        .source(s, XContentType.JSON));
                    }
                });

        esSinkBuilder.setBulkFlushMaxActions(1);
        //数据一开始没索引的情况下加入下面代码
        esSinkBuilder.setRestClientFactory(new RestClientFactoryImpl());
        esSinkBuilder.setFailureHandler(new RetryRejectedExecutionFailureHandler());


        student.addSink(esSinkBuilder.build());
        env.execute("flink learning connectors es6");
    }
}
